package com.bw.app.dws;


import com.bw.bean.UserBabyInfo;
import com.bw.utils.MyClickHouseUtil;
import com.bw.utils.MyKafkaUtil;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class Dws_gd12 {
    public static void main(String[] args) throws Exception {
        //        环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //    性别       性别倾向得分=∑(商品类别权重×行为权重×行为数量×时间衰减系数)
//       用户id  商品名 男分  女分  行为类型 分   操作时间     用户表join日志表join商品表

        tEnv.executeSql("create table dwd_user_join( " +
                "user_id string, " +
                "product_title string, " +
                "behavior_type string, " +
                "behavior_time string " +
                ")" + MyKafkaUtil.getKafkaDDL("dwd_user_join_topic", "user_join"));


        Table gender = tEnv.sqlQuery("WITH t1 AS (  " +
                "    SELECT   " +
                "        user_id,  " +
                "        product_title,  " +
                "        CASE  " +
                "            WHEN product_title LIKE '%男童外套%' OR product_title LIKE '%男童运动鞋%' OR product_title LIKE '%电动遥控玩具%' OR product_title LIKE '%儿童机器人%' OR product_title LIKE '%男孩专属玩具%' THEN '1'  " +
                "            WHEN product_title LIKE '%连身衣%' OR product_title LIKE '%爬服%' OR product_title LIKE '%哈衣%' OR product_title LIKE '%婴儿礼盒%' OR product_title LIKE '%STEAM 教具%' OR product_title LIKE '%积木%' OR product_title LIKE '%绘画用品%' THEN '0.5'  " +
                "            WHEN product_title LIKE '%婴儿护肤%' OR product_title LIKE '%奶瓶%' OR product_title LIKE '%湿巾%' THEN '0.2'  " +
                "            ELSE '0'  " +
                "        END AS man_fraction,  " +
                "        CASE  " +
                "            WHEN product_title LIKE '%女童连衣裙%' OR product_title LIKE '%女童套装%' OR product_title LIKE '%女童裙子%' OR product_title LIKE '%洋娃娃%' OR product_title LIKE '%公主系列%' THEN '1'  " +
                "            WHEN product_title LIKE '%连身衣%' OR product_title LIKE '%爬服%' OR product_title LIKE '%哈衣%' OR product_title LIKE '%婴儿礼盒%' OR product_title LIKE '%STEAM 教具%' OR product_title LIKE '%积木%' OR product_title LIKE '%绘画用品%' THEN '0.5'  " +
                "            WHEN product_title LIKE '%婴儿护肤%' OR product_title LIKE '%奶瓶%' OR product_title LIKE '%湿巾%' THEN '0.2'  " +
                "            ELSE '0'  " +
                "        END AS woman_fraction,  " +
                "        behavior_type,  " +
                "        CASE  " +
                "            WHEN behavior_type LIKE '%下单%' THEN '1'  " +
                "            WHEN behavior_type LIKE '%加购%' THEN '0.5'  " +
                "            WHEN behavior_type LIKE '%收藏%' THEN '0.4'  " +
                "            WHEN behavior_type LIKE '%浏览%' THEN '0.2'  " +
                "            ELSE '0'  " +
                "        END AS type_fraction,  " +
                "        behavior_time,  " +
                "        CASE  " +
                "            WHEN (UNIX_TIMESTAMP(CAST(CURRENT_DATE AS STRING), 'yyyy-MM-dd') - UNIX_TIMESTAMP(SUBSTR(behavior_time, 1, 10), 'yyyy-MM-dd')) / 86400 <= 30 THEN '1'  " +
                "            WHEN (UNIX_TIMESTAMP(CAST(CURRENT_DATE AS STRING), 'yyyy-MM-dd') - UNIX_TIMESTAMP(SUBSTR(behavior_time, 1, 10), 'yyyy-MM-dd')) / 86400 <= 60 THEN '0.5'  " +
                "            ELSE '0.2'  " +
                "        END AS time_fraction  " +
                "    FROM dwd_user_join  " +
                "),  " +
                "t2 AS (  " +
                "    SELECT   " +
                "        user_id,  " +
                "        product_title,  " +
                "        man_fraction,  " +
                "        woman_fraction,  " +
                "        behavior_type,    " +
                "        type_fraction,  " +
                "        behavior_time,  " +
                "        (CAST(man_fraction AS DECIMAL(10,2)) * CAST(type_fraction AS DECIMAL(10,2)) * CAST(time_fraction AS DECIMAL(10,2))) AS man,  " +
                "        (CAST(woman_fraction AS DECIMAL(10,2)) * CAST(type_fraction AS DECIMAL(10,2)) * CAST(time_fraction AS DECIMAL(10,2))) AS woman  " +
                "    FROM t1  " +
                "),  " +
                "t3 AS (  " +
                "    SELECT   " +
                "        user_id,  " +
                "        SUM(man) AS man,    " +
                "        SUM(woman) AS woman   " +
                "    FROM t2  " +
                "    GROUP BY user_id  " +
                ")  " +
                "SELECT  " +
                "    user_id,  " +
                "    CASE  " +
                "        WHEN man > woman THEN '男宝'  " +
                "        WHEN man < woman THEN '女宝'  " +
                "        ELSE '未识别'  " +
                "    END AS gender  " +
                "FROM t3");

        tEnv.createTemporaryView("user_baby_gender",gender);

//        tEnv.executeSql("select * from user_baby_gender").print();


        // 构建用户年龄范围预测的Flink SQL查询
//        Table age = tEnv.sqlQuery("WITH t1 AS (   " +
//                "select    " +
//                "user_id,   " +
//                "product_title,   " +
//                " CASE    " +
//                "    WHEN product_title LIKE '%孕妇装%' OR product_title LIKE '%防辐射服%' OR product_title LIKE '%孕妇裤%' OR product_title LIKE '%待产包%' OR product_title LIKE '%叶酸%' OR product_title LIKE '%钙铁锌%' THEN 1     " +
//                "    ELSE 0   " +
//                "    END as pregnancy_early,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%孕妇装%' OR product_title LIKE '%孕产妇护肤用品%' OR product_title LIKE '%孕妇奶粉%' OR product_title LIKE '%胎教产品%' THEN 1     " +
//                "    ELSE 0   " +
//                "    END as pregnancy_mid,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%婴儿床%' OR product_title LIKE '%奶瓶%' OR product_title LIKE '%吸奶器%' OR product_title LIKE '%乳头霜%' OR product_title LIKE '%月子餐%'  THEN 1     " +
//                "    ELSE 0   " +
//                "    END as pregnancy_late,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%婴儿纸尿裤%' OR product_title LIKE '%婴儿湿巾%' OR product_title LIKE '%婴儿洗护用品%' OR product_title LIKE '%婴儿服饰%' THEN 1     " +
//                "    ELSE 0   " +
//                "    END as infant_0_3m,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%米粉%' OR product_title LIKE '%果泥%' OR product_title LIKE '%婴儿奶瓶%' OR product_title LIKE '%婴儿安全座椅%' OR product_title LIKE '%婴儿玩具%'  THEN 1     " +
//                "    ELSE 0   " +
//                "    END as infant_3_6m,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%婴儿学步鞋%' OR product_title LIKE '%儿童奶粉%' OR product_title LIKE '%DHA%' OR product_title LIKE '%益生菌%' OR product_title LIKE '%早教玩具%' THEN 1     " +
//                "    ELSE 0   " +
//                "    END as infant_6_12m,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%学步鞋%' OR product_title LIKE '%连体裤%' OR product_title LIKE '%幼儿辅食（零食、调味品）%' OR product_title LIKE '%幼儿安全座椅%' OR product_title LIKE '%学步车%' THEN 1     " +
//                "    ELSE 0   " +
//                "    END as toddler_12_24m,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%幼儿玩具%' OR product_title LIKE '%幼儿服饰%' OR product_title LIKE '%儿童绘本%' OR product_title LIKE '%学饮杯%'  THEN 1     " +
//                "    ELSE 0   " +
//                "    END as toddler_24_36m,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%T 恤%' OR product_title LIKE '%裤子%' OR product_title LIKE '%儿童营养品%' OR product_title LIKE '%拼图%' OR product_title LIKE '%积木%' OR product_title LIKE '%儿童读物%' THEN 1     " +
//                "    ELSE 0   " +
//                "    END as child_3_6y,   " +
//                " CASE   " +
//                "    WHEN product_title LIKE '%儿童运动鞋%' OR product_title LIKE '%儿童文具%' OR product_title LIKE '%早教机%' OR product_title LIKE '%儿童零食%' OR product_title LIKE '%儿童护肤品%' THEN 1     " +
//                "    ELSE 0   " +
//                "    END as child_6_12y,   " +
//                "behavior_type,   " +
//                "CASE   " +
//                "         WHEN behavior_type LIKE '%下单%' THEN '1'   " +
//                "         WHEN behavior_type LIKE '%加购%' THEN '0.5'   " +
//                "         WHEN behavior_type LIKE '%收藏%' THEN '0.4'   " +
//                "         WHEN behavior_type LIKE '%浏览%' THEN '0.2'   " +
//                "            ELSE '0'   " +
//                "        END AS type_fraction   " +
//                "FROM dwd_user_join   " +
//                "),    " +
//                " t2 AS (     " +
//                "       SELECT     " +
//                "        user_id,     " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * pregnancy_early) AS pregnancy_early_score,     " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * pregnancy_mid) AS pregnancy_mid_score,     " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * pregnancy_late) AS pregnancy_late_score,   " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * infant_0_3m) AS infant_0_3m_score,     " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * infant_3_6m) AS infant_3_6m_score,   " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * infant_6_12m) AS infant_6_12m_score,    " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * toddler_12_24m) AS toddler_12_24m_score,     " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * toddler_24_36m) AS toddler_24_36m_score,    " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * child_3_6y) AS child_3_6y_score,     " +
//                "        SUM(CAST(type_fraction AS DECIMAL(3,1)) * child_6_12y) AS child_6_12y_score    " +
//                "        FROM t1     " +
//                "        GROUP BY user_id    " +
//                " )     " +
//                "SELECT     " +
//                "       user_id,     " +
//                "       CASE     " +
//                // 孕早期：比较其他8个分数的最大值
//                "           WHEN pregnancy_early_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score AND pregnancy_mid_score >= infant_6_12m_score AND pregnancy_mid_score >= toddler_12_24m_score AND pregnancy_mid_score >= toddler_24_36m_score AND pregnancy_mid_score >= child_3_6y_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score AND pregnancy_late_score >= infant_6_12m_score AND pregnancy_late_score >= toddler_12_24m_score AND pregnancy_late_score >= toddler_24_36m_score AND pregnancy_late_score >= child_3_6y_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_0_3m_score >= infant_3_6m_score AND infant_0_3m_score >= infant_6_12m_score AND infant_0_3m_score >= toddler_12_24m_score AND infant_0_3m_score >= toddler_24_36m_score AND infant_0_3m_score >= child_3_6y_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_3_6m_score >= infant_6_12m_score AND infant_3_6m_score >= toddler_12_24m_score AND infant_3_6m_score >= toddler_24_36m_score AND infant_3_6m_score >= child_3_6y_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '孕早期'     " +
//                // 孕中期：比较其他8个分数的最大值
//                "           WHEN pregnancy_mid_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score AND pregnancy_early_score >= infant_6_12m_score AND pregnancy_early_score >= toddler_12_24m_score AND pregnancy_early_score >= toddler_24_36m_score AND pregnancy_early_score >= child_3_6y_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score AND pregnancy_late_score >= infant_6_12m_score AND pregnancy_late_score >= toddler_12_24m_score AND pregnancy_late_score >= toddler_24_36m_score AND pregnancy_late_score >= child_3_6y_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_0_3m_score >= infant_3_6m_score AND infant_0_3m_score >= infant_6_12m_score AND infant_0_3m_score >= toddler_12_24m_score AND infant_0_3m_score >= toddler_24_36m_score AND infant_0_3m_score >= child_3_6y_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_3_6m_score >= infant_6_12m_score AND infant_3_6m_score >= toddler_12_24m_score AND infant_3_6m_score >= toddler_24_36m_score AND infant_3_6m_score >= child_3_6y_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '孕中期'     " +
//                // 孕晚期：比较其他8个分数的最大值
//                "           WHEN pregnancy_late_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score AND pregnancy_early_score >= infant_6_12m_score AND pregnancy_early_score >= toddler_12_24m_score AND pregnancy_early_score >= toddler_24_36m_score AND pregnancy_early_score >= child_3_6y_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score AND pregnancy_mid_score >= infant_6_12m_score AND pregnancy_mid_score >= toddler_12_24m_score AND pregnancy_mid_score >= toddler_24_36m_score AND pregnancy_mid_score >= child_3_6y_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN infant_0_3m_score >= infant_3_6m_score AND infant_0_3m_score >= infant_6_12m_score AND infant_0_3m_score >= toddler_12_24m_score AND infant_0_3m_score >= toddler_24_36m_score AND infant_0_3m_score >= child_3_6y_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_3_6m_score >= infant_6_12m_score AND infant_3_6m_score >= toddler_12_24m_score AND infant_3_6m_score >= toddler_24_36m_score AND infant_3_6m_score >= child_3_6y_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '孕晚期'     " +
//                // 0-3个月：比较其他8个分数的最大值
//                "           WHEN infant_0_3m_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_3_6m_score AND pregnancy_early_score >= infant_6_12m_score AND pregnancy_early_score >= toddler_12_24m_score AND pregnancy_early_score >= toddler_24_36m_score AND pregnancy_early_score >= child_3_6y_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_3_6m_score AND pregnancy_mid_score >= infant_6_12m_score AND pregnancy_mid_score >= toddler_12_24m_score AND pregnancy_mid_score >= toddler_24_36m_score AND pregnancy_mid_score >= child_3_6y_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN pregnancy_late_score >= infant_3_6m_score AND pregnancy_late_score >= infant_6_12m_score AND pregnancy_late_score >= toddler_12_24m_score AND pregnancy_late_score >= toddler_24_36m_score AND pregnancy_late_score >= child_3_6y_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_3_6m_score >= infant_6_12m_score AND infant_3_6m_score >= toddler_12_24m_score AND infant_3_6m_score >= toddler_24_36m_score AND infant_3_6m_score >= child_3_6y_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '0-3个月'     " +
//                // 3-6个月：比较其他8个分数的最大值
//                "           WHEN infant_3_6m_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_6_12m_score AND pregnancy_early_score >= toddler_12_24m_score AND pregnancy_early_score >= toddler_24_36m_score AND pregnancy_early_score >= child_3_6y_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_6_12m_score AND pregnancy_mid_score >= toddler_12_24m_score AND pregnancy_mid_score >= toddler_24_36m_score AND pregnancy_mid_score >= child_3_6y_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_6_12m_score AND pregnancy_late_score >= toddler_12_24m_score AND pregnancy_late_score >= toddler_24_36m_score AND pregnancy_late_score >= child_3_6y_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_0_3m_score >= infant_6_12m_score AND infant_0_3m_score >= toddler_12_24m_score AND infant_0_3m_score >= toddler_24_36m_score AND infant_0_3m_score >= child_3_6y_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '3-6个月'     " +
//                // 6-12个月：比较其他8个分数的最大值
//                "           WHEN infant_6_12m_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score AND pregnancy_early_score >= toddler_12_24m_score AND pregnancy_early_score >= toddler_24_36m_score AND pregnancy_early_score >= child_3_6y_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score AND pregnancy_mid_score >= toddler_12_24m_score AND pregnancy_mid_score >= toddler_24_36m_score AND pregnancy_mid_score >= child_3_6y_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score AND pregnancy_late_score >= toddler_12_24m_score AND pregnancy_late_score >= toddler_24_36m_score AND pregnancy_late_score >= child_3_6y_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_0_3m_score >= infant_3_6m_score AND infant_0_3m_score >= toddler_12_24m_score AND infant_0_3m_score >= toddler_24_36m_score AND infant_0_3m_score >= child_3_6y_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_3_6m_score >= toddler_12_24m_score AND infant_3_6m_score >= toddler_24_36m_score AND infant_3_6m_score >= child_3_6y_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '6-12个月'     " +
//                // 12-24个月：比较其他8个分数的最大值
//                "           WHEN toddler_12_24m_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score AND pregnancy_early_score >= infant_6_12m_score AND pregnancy_early_score >= toddler_24_36m_score AND pregnancy_early_score >= child_3_6y_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score AND pregnancy_mid_score >= infant_6_12m_score AND pregnancy_mid_score >= toddler_24_36m_score AND pregnancy_mid_score >= child_3_6y_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score AND pregnancy_late_score >= infant_6_12m_score AND pregnancy_late_score >= toddler_24_36m_score AND pregnancy_late_score >= child_3_6y_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_0_3m_score >= infant_3_6m_score AND infant_0_3m_score >= infant_6_12m_score AND infant_0_3m_score >= toddler_24_36m_score AND infant_0_3m_score >= child_3_6y_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_3_6m_score >= infant_6_12m_score AND infant_3_6m_score >= toddler_24_36m_score AND infant_3_6m_score >= child_3_6y_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '12-24个月'     " +
//                // 24-36个月：比较其他8个分数的最大值
//                "           WHEN toddler_24_36m_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score AND pregnancy_early_score >= infant_6_12m_score AND pregnancy_early_score >= toddler_12_24m_score AND pregnancy_early_score >= child_3_6y_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score AND pregnancy_mid_score >= infant_6_12m_score AND pregnancy_mid_score >= toddler_12_24m_score AND pregnancy_mid_score >= child_3_6y_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score AND pregnancy_late_score >= infant_6_12m_score AND pregnancy_late_score >= toddler_12_24m_score AND pregnancy_late_score >= child_3_6y_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_0_3m_score >= infant_3_6m_score AND infant_0_3m_score >= infant_6_12m_score AND infant_0_3m_score >= toddler_12_24m_score AND infant_0_3m_score >= child_3_6y_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_3_6m_score >= infant_6_12m_score AND infant_3_6m_score >= toddler_12_24m_score AND infant_3_6m_score >= child_3_6y_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '24-36个月'     " +
//                // 3-6岁：比较其他8个分数的最大值
//                "           WHEN child_3_6y_score >= ( " +
//                "               CASE " +
//                "                   WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score AND pregnancy_early_score >= infant_6_12m_score AND pregnancy_early_score >= toddler_12_24m_score AND pregnancy_early_score >= toddler_24_36m_score AND pregnancy_early_score >= child_6_12y_score THEN pregnancy_early_score " +
//                "                   WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score AND pregnancy_mid_score >= infant_6_12m_score AND pregnancy_mid_score >= toddler_12_24m_score AND pregnancy_mid_score >= toddler_24_36m_score AND pregnancy_mid_score >= child_6_12y_score THEN pregnancy_mid_score " +
//                "                   WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score AND pregnancy_late_score >= infant_6_12m_score AND pregnancy_late_score >= toddler_12_24m_score AND pregnancy_late_score >= toddler_24_36m_score AND pregnancy_late_score >= child_6_12y_score THEN pregnancy_late_score " +
//                "                   WHEN infant_0_3m_score >= infant_3_6m_score AND infant_0_3m_score >= infant_6_12m_score AND infant_0_3m_score >= toddler_12_24m_score AND infant_0_3m_score >= toddler_24_36m_score AND infant_0_3m_score >= child_6_12y_score THEN infant_0_3m_score " +
//                "                   WHEN infant_3_6m_score >= infant_6_12m_score AND infant_3_6m_score >= toddler_12_24m_score AND infant_3_6m_score >= toddler_24_36m_score AND infant_3_6m_score >= child_6_12y_score THEN infant_3_6m_score " +
//                "                   WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
//                "                   WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
//                "                   WHEN toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
//                "                   ELSE child_6_12y_score " +
//                "               END " +
//                "           ) THEN '3-6岁'     " +
//                // 否则为6-12岁
//                "           ELSE '6-12岁'     " +
//                "       END AS predicted_age_range     " +
//                "FROM t2");
//// 阶段名称列表（用于后续复用）
//        String[] stages = {
//                "pregnancy_early", "pregnancy_mid", "pregnancy_late",
//                "infant_0_3m", "infant_3_6m", "infant_6_12m",
//                "toddler_12_24m", "toddler_24_36m", "child_3_6y", "child_6_12y"
//        };

// t1表中各阶段的判断条件（product_title匹配规则）
        String pregnancyEarlyCond = "WHEN product_title LIKE '%孕妇装%' OR product_title LIKE '%防辐射服%' OR product_title LIKE '%孕妇裤%' OR product_title LIKE '%待产包%' OR product_title LIKE '%叶酸%' OR product_title LIKE '%钙铁锌%' THEN 1 ELSE 0";
        String pregnancyMidCond = "WHEN product_title LIKE '%孕妇装%' OR product_title LIKE '%孕产妇护肤用品%' OR product_title LIKE '%孕妇奶粉%' OR product_title LIKE '%胎教产品%' THEN 1 ELSE 0";
        String pregnancyLateCond = "WHEN product_title LIKE '%婴儿床%' OR product_title LIKE '%奶瓶%' OR product_title LIKE '%吸奶器%' OR product_title LIKE '%乳头霜%' OR product_title LIKE '%月子餐%' THEN 1 ELSE 0";
        String infant03mCond = "WHEN product_title LIKE '%婴儿纸尿裤%' OR product_title LIKE '%婴儿湿巾%' OR product_title LIKE '%婴儿洗护用品%' OR product_title LIKE '%婴儿服饰%' THEN 1 ELSE 0";
        String infant36mCond = "WHEN product_title LIKE '%米粉%' OR product_title LIKE '%果泥%' OR product_title LIKE '%婴儿奶瓶%' OR product_title LIKE '%婴儿安全座椅%' OR product_title LIKE '%婴儿玩具%' THEN 1 ELSE 0";
        String infant612mCond = "WHEN product_title LIKE '%婴儿学步鞋%' OR product_title LIKE '%儿童奶粉%' OR product_title LIKE '%DHA%' OR product_title LIKE '%益生菌%' OR product_title LIKE '%早教玩具%' THEN 1 ELSE 0";
        String toddler1224mCond = "WHEN product_title LIKE '%学步鞋%' OR product_title LIKE '%连体裤%' OR product_title LIKE '%幼儿辅食（零食、调味品）%' OR product_title LIKE '%幼儿安全座椅%' OR product_title LIKE '%学步车%' THEN 1 ELSE 0";
        String toddler2436mCond = "WHEN product_title LIKE '%幼儿玩具%' OR product_title LIKE '%幼儿服饰%' OR product_title LIKE '%儿童绘本%' OR product_title LIKE '%学饮杯%' THEN 1 ELSE 0";
        String child36yCond = "WHEN product_title LIKE '%T 恤%' OR product_title LIKE '%裤子%' OR product_title LIKE '%儿童营养品%' OR product_title LIKE '%拼图%' OR product_title LIKE '%积木%' OR product_title LIKE '%儿童读物%' THEN 1 ELSE 0";
        String child612yCond = "WHEN product_title LIKE '%儿童运动鞋%' OR product_title LIKE '%儿童文具%' OR product_title LIKE '%早教机%' OR product_title LIKE '%儿童零食%' OR product_title LIKE '%儿童护肤品%' THEN 1 ELSE 0";

// 行为类型转分数的条件（type_fraction）
        String behaviorFractionCond = "CASE " +
                "WHEN behavior_type LIKE '%下单%' THEN '1' " +
                "WHEN behavior_type LIKE '%加购%' THEN '0.5' " +
                "WHEN behavior_type LIKE '%收藏%' THEN '0.4' " +
                "WHEN behavior_type LIKE '%浏览%' THEN '0.2' " +
                "ELSE '0' " +
                "END AS type_fraction";

// t1表：打阶段标签 + 行为分数转换
        String t1Sql = "t1 AS ( " +
                "SELECT " +
                "user_id, " +
                "product_title, " +
                "CASE " + pregnancyEarlyCond + " END AS pregnancy_early, " +
                "CASE " + pregnancyMidCond + " END AS pregnancy_mid, " +
                "CASE " + pregnancyLateCond + " END AS pregnancy_late, " +
                "CASE " + infant03mCond + " END AS infant_0_3m, " +
                "CASE " + infant36mCond + " END AS infant_3_6m, " +
                "CASE " + infant612mCond + " END AS infant_6_12m, " +
                "CASE " + toddler1224mCond + " END AS toddler_12_24m, " +
                "CASE " + toddler2436mCond + " END AS toddler_24_36m, " +
                "CASE " + child36yCond + " END AS child_3_6y, " +
                "CASE " + child612yCond + " END AS child_6_12y, " +
                "behavior_type, " +
                behaviorFractionCond + " " +
                "FROM dwd_user_join " +
                ")";

        // t2表：按用户聚合各阶段得分
        String t2Sql = "t2 AS ( " +
                "SELECT " +
                "user_id, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * pregnancy_early) AS pregnancy_early_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * pregnancy_mid) AS pregnancy_mid_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * pregnancy_late) AS pregnancy_late_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * infant_0_3m) AS infant_0_3m_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * infant_3_6m) AS infant_3_6m_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * infant_6_12m) AS infant_6_12m_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * toddler_12_24m) AS toddler_12_24m_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * toddler_24_36m) AS toddler_24_36m_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * child_3_6y) AS child_3_6y_score, " +
                "SUM(CAST(type_fraction AS DECIMAL(3,1)) * child_6_12y) AS child_6_12y_score " +
                "FROM t1 " +
                "GROUP BY user_id " +
                ")";

        // 正确：WHEN直接跟在CASE后，不通过ELSE连接
        String t3Sql = "t3 AS ( " +
                "SELECT " +
                "user_id, " +
                // 比较前5个阶段的最高分（正确的CASE结构）
                "CASE " +
                "WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score THEN pregnancy_early_score " +  // 第一个WHEN
                "WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score THEN pregnancy_mid_score " +  // 第二个WHEN（直接跟在前一个THEN后）
                "WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score THEN pregnancy_late_score " +  // 第三个WHEN
                "WHEN infant_0_3m_score >= infant_3_6m_score THEN infant_0_3m_score " +  // 第四个WHEN
                "ELSE infant_3_6m_score " +  // 最后用ELSE收尾
                "END AS top_score_1, " +
                // 阶段名称的CASE同理
                "CASE " +
                "WHEN pregnancy_early_score >= pregnancy_mid_score AND pregnancy_early_score >= pregnancy_late_score AND pregnancy_early_score >= infant_0_3m_score AND pregnancy_early_score >= infant_3_6m_score THEN '孕早期' " +
                "WHEN pregnancy_mid_score >= pregnancy_late_score AND pregnancy_mid_score >= infant_0_3m_score AND pregnancy_mid_score >= infant_3_6m_score THEN '孕中期' " +
                "WHEN pregnancy_late_score >= infant_0_3m_score AND pregnancy_late_score >= infant_3_6m_score THEN '孕晚期' " +
                "WHEN infant_0_3m_score >= infant_3_6m_score THEN '0-3个月' " +
                "ELSE '3-6个月' " +
                "END AS top_stage_1 " +
                "FROM t2 " +
                ")";

        String t4Sql = "t4 AS ( " +
                "SELECT " +
                "user_id, " +
                "CASE " +
                "WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN infant_6_12m_score " +
                "WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN toddler_12_24m_score " +
                "WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN toddler_24_36m_score " +
                "WHEN child_3_6y_score >= child_6_12y_score THEN child_3_6y_score " +
                "ELSE child_6_12y_score " +
                "END AS top_score_2, " +
                "CASE " +
                "WHEN infant_6_12m_score >= toddler_12_24m_score AND infant_6_12m_score >= toddler_24_36m_score AND infant_6_12m_score >= child_3_6y_score AND infant_6_12m_score >= child_6_12y_score THEN '6-12个月' " +
                "WHEN toddler_12_24m_score >= toddler_24_36m_score AND toddler_12_24m_score >= child_3_6y_score AND toddler_12_24m_score >= child_6_12y_score THEN '12-24个月' " +
                "WHEN toddler_24_36m_score >= child_3_6y_score AND toddler_24_36m_score >= child_6_12y_score THEN '24-36个月' " +
                "WHEN child_3_6y_score >= child_6_12y_score THEN '3-6岁' " +
                "ELSE '6-12岁' " +
                "END AS top_stage_2 " +
                "FROM t2 " +
                ")";


        String t5Sql = "t5 AS ( " +
                "SELECT " +
                "t3.user_id, " +
                "CASE " +
                "WHEN t3.top_score_1 >= t4.top_score_2 THEN t3.top_stage_1 " +
                "ELSE t4.top_stage_2 " +
                "END AS predicted_age_range " +
                "FROM t3 " +
                "JOIN t4 ON t3.user_id = t4.user_id " +
                ")";

        String fullSql = "WITH " + t1Sql + ", " + t2Sql + ", " + t3Sql + ", " + t4Sql + ", " + t5Sql + " " +
                "SELECT user_id, predicted_age_range FROM t5";
// 执行查询
        Table age = tEnv.sqlQuery(fullSql);
        tEnv.createTemporaryView("user_baby_age",age);

        Table sink = tEnv.sqlQuery("select    " +
                "ua.user_id as uid,   " +
                "ug.gender as gender,    " +
                "ua.predicted_age_range as age   " +
                " from user_baby_age ua JOIN user_baby_gender ug on ua.user_id = ug.user_id ");


//        tEnv.createTemporaryView("sink",sink);
        
//        tEnv.executeSql("CREATE TABLE sink_table (   " +
//                "    uid INT,           " +
//                "    gender STRING,     " +
//                "    age STRING,        " +
//                "    event_time TIMESTAMP(3),    " +
//                "    version BIGINT       " +
//                ") WITH (   " +
//                "    'connector' = 'clickhouse',   " +
//                "    'url' = 'clickhouse://hadoop102:8123/gmall',   " +
//                "    'table-name' = 'user_baby_flink'     " +
//                ")");


//        DataStream<Row> resultDS = tEnv.toAppendStream(sink, Row.class);
        DataStream<Tuple2<Boolean, Row>> resultDS = tEnv.toRetractStream(sink, Row.class);

        // 过滤出新增/更新的数据（忽略撤回的数据）
        DataStream<Row> appendOnlyStream = resultDS
                .filter(tuple -> tuple.f0)  // 只保留 f0 为 true 的数据
                .map(tuple -> tuple.f1)     // 提取 Row 数据
                .returns(TypeInformation.of(Row.class));

        // 转换为 POJO
        DataStream<UserBabyInfo> userInfoDS = appendOnlyStream.map(row -> {
            UserBabyInfo info = new UserBabyInfo();
            info.setUid(row.getFieldAs("uid"));
            info.setGender(row.getFieldAs("gender"));
            info.setAge(row.getFieldAs("age"));
            return info;
        }).returns(TypeInformation.of(UserBabyInfo.class));

        userInfoDS.print(">>>>>>>>>>>>");
        // 将数据写入 ClickHouse
        userInfoDS.addSink(MyClickHouseUtil.getSinkFunction(
                "INSERT INTO gmall.user_baby_flink (uid, gender, age) VALUES (?, ?, ?)"
        ));

        env.execute();

//        resultDS.addSink(MyClickHouseUtil.getSinkFunction("insert into dws_traffic_vc_ch_ar_is_new_page_view_window values(?,?,?,?,?,?,?,?,?,?,?,?)"));

//    年龄段    总得分=Σ（行为类型权重×商品品类对应年龄段权重）
//        用户id   行为类型 分  商品名  1分  2分 3分。。。    用户表join日志表join商品表







    }
}
